{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data passing tutorial\n",
    "Data passing is the most important aspect of Pipelines.\n",
    "\n",
    "In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.\n",
    "\n",
    "Component have inputs and outputs. They can consume and produce arbitrary data.\n",
    "\n",
    "Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input.\n",
    "\n",
    "The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline.\n",
    "\n",
    "This tutorial shows how to create python components that produce, consume and transform data.\n",
    "It shows how to create data passing pipelines by instantiating components and connecting them together."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')\n",
    "kfp_endpoint='https://XXXXX.{pipelines|notebooks}.googleusercontent.com/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install Kubeflow Pipelines SDK. Add the --user argument if you get permission errors.\n",
    "!PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install 'kfp>=1.4.0' --quiet --user"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import NamedTuple\n",
    "\n",
    "import kfp\n",
    "from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile\n",
    "from kfp.components import func_to_container_op"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Small data\n",
    "\n",
    "Small data is the data that you'll be comfortable passing as program's command-line argument. Small data size should not exceed few kilobytes.\n",
    "\n",
    "Some examples of typical types of small data are: number, URL, small string (e.g. column name).\n",
    "\n",
    "Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods taht are more suitable for bigger data (more than several kilobytes) or binary data.\n",
    "\n",
    "All small data outputs will be at some point serialized to strings and all small data input values will be at some point deserialized from strings (passed as command-line argumants). There are built-in serializers and deserializers for several common types (e.g. `str`, `int`, `float`, `bool`, `list`, `dict`). All other types of data need to be serialized manually before returning the data. Make sure to properly specify type annotations, otherwize there would be no automatic deserialization and the component function will receive strings instead of deserialized objects."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Consuming small data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def print_small_text(text: str):\n",
    "    '''Print small text'''\n",
    "    print(text)\n",
    "\n",
    "def constant_to_consumer_pipeline():\n",
    "    '''Pipeline that passes small constant string to to consumer'''\n",
    "    consume_task = print_small_text('Hello world') # Passing constant as argument to consumer\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def pipeline_parameter_to_consumer_pipeline(text: str):\n",
    "    '''Pipeline that passes small pipeline parameter string to to consumer'''\n",
    "    consume_task = print_small_text(text) # Passing pipeline parameter as argument to consumer\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
    "    pipeline_parameter_to_consumer_pipeline,\n",
    "    arguments={'text': 'Hello world'}\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Producing small data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def produce_one_small_output() -> str:\n",
    "    return 'Hello world'\n",
    "\n",
    "def task_output_to_consumer_pipeline():\n",
    "    '''Pipeline that passes small data from producer to consumer'''\n",
    "    produce_task = produce_one_small_output()\n",
    "    # Passing producer task output as argument to consumer\n",
    "    consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components\n",
    "    consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Producing and consuming multiple arguments"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def produce_two_small_outputs() -> NamedTuple('Outputs', [('text', str), ('number', int)]):\n",
    "    return (\"data 1\", 42)\n",
    "\n",
    "@func_to_container_op\n",
    "def consume_two_arguments(text: str, number: int):\n",
    "    print('Text={}'.format(text))\n",
    "    print('Number={}'.format(str(number)))\n",
    "\n",
    "def producers_to_consumers_pipeline(text: str = \"Hello world\"):\n",
    "    '''Pipeline that passes data from producer to consumer'''\n",
    "    produce1_task = produce_one_small_output()\n",
    "    produce2_task = produce_two_small_outputs()\n",
    "\n",
    "    consume_task1 = consume_two_arguments(produce1_task.output, 42)\n",
    "    consume_task2 = consume_two_arguments(text, produce2_task.outputs['number'])\n",
    "    consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])\n",
    "\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Consuming and producing data at the same time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def get_item_from_list(list_of_strings: list, index: int) -> str:\n",
    "    return list_of_strings[index]\n",
    "\n",
    "@func_to_container_op\n",
    "def truncate_text(text: str, max_length: int) -> str:\n",
    "    return text[0:max_length]\n",
    "\n",
    "def processing_pipeline(text: str = \"Hello world\"):\n",
    "    truncate_task = truncate_text(text, max_length=5)\n",
    "    get_item_task = get_item_from_list(list_of_strings=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)\n",
    "    print_small_text(get_item_task.output)\n",
    "\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(processing_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Bigger data (files)\n",
    "\n",
    "Bigger data should be read from files and written to files.\n",
    "\n",
    "The paths for the input and output files are chosen by the system and are passed into the function (as strings).\n",
    "\n",
    "Use the `InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function.\n",
    "\n",
    "Use the `OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.\n",
    "\n",
    "You can specify the type of the consumed/produced data by specifying the type argument to `InputPath` and `OutputPath`. The type can be a python type or an arbitrary type name string. `OutputPath('TFModel')` means that the function states that the data it has written to a file has type 'TFModel'. `InputPath('TFModel')` means that the function states that it expect the data it reads from a file to have type 'TFModel'. When the pipeline author connects inputs to outputs the system checks whether the types match.\n",
    "\n",
    "Note on input/output names: When the function is converted to component, the input and output names generally follow the parameter names, but the \"\\_path\" and \"\\_file\" suffixes are stripped from file/path inputs and outputs. E.g. the `number_file_path: InputPath(int)` parameter becomes the `number: int` input. This makes the argument passing look more natural: `number=42` instead of `number_file_path=42`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "### Writing and reading bigger data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Writing bigger data\n",
    "@func_to_container_op\n",
    "def repeat_line(line: str, output_text_path: OutputPath(str), count: int = 10):\n",
    "    '''Repeat the line specified number of times'''\n",
    "    with open(output_text_path, 'w') as writer:\n",
    "        for i in range(count):\n",
    "            writer.write(line + '\\n')\n",
    "\n",
    "\n",
    "# Reading bigger data\n",
    "@func_to_container_op\n",
    "def print_text(text_path: InputPath()): # The \"text\" input is untyped so that any data can be printed\n",
    "    '''Print text'''\n",
    "    with open(text_path, 'r') as reader:\n",
    "        for line in reader:\n",
    "            print(line, end = '')\n",
    "\n",
    "def print_repeating_lines_pipeline():\n",
    "    repeat_lines_task = repeat_line(line='Hello', count=5000)\n",
    "    print_text(repeat_lines_task.output) # Don't forget .output !\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Processing bigger data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def split_text_lines(source_path: InputPath(str), odd_lines_path: OutputPath(str), even_lines_path: OutputPath(str)):\n",
    "    with open(source_path, 'r') as reader:\n",
    "        with open(odd_lines_path, 'w') as odd_writer:\n",
    "            with open(even_lines_path, 'w') as even_writer:\n",
    "                while True:\n",
    "                    line = reader.readline()\n",
    "                    if line == \"\":\n",
    "                        break\n",
    "                    odd_writer.write(line)\n",
    "                    line = reader.readline()\n",
    "                    if line == \"\":\n",
    "                        break\n",
    "                    even_writer.write(line)\n",
    "\n",
    "def text_splitting_pipeline():\n",
    "    text = '\\n'.join(['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten'])\n",
    "    split_text_task = split_text_lines(text)\n",
    "    print_text(split_text_task.outputs['odd_lines'])\n",
    "    print_text(split_text_task.outputs['even_lines'])\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline, arguments={})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Processing bigger data with pre-opened files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@func_to_container_op\n",
    "def split_text_lines2(source_file: InputTextFile(str), odd_lines_file: OutputTextFile(str), even_lines_file: OutputTextFile(str)):\n",
    "    while True:\n",
    "        line = source_file.readline()\n",
    "        if line == \"\":\n",
    "            break\n",
    "        odd_lines_file.write(line)\n",
    "        line = source_file.readline()\n",
    "        if line == \"\":\n",
    "            break\n",
    "        even_lines_file.write(line)\n",
    "\n",
    "def text_splitting_pipeline2():\n",
    "    text = '\\n'.join(['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten'])\n",
    "    split_text_task = split_text_lines2(text)\n",
    "    print_text(split_text_task.outputs['odd_lines']).set_display_name('Odd lines')\n",
    "    print_text(split_text_task.outputs['even_lines']).set_display_name('Even lines')\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example: Pipeline that generates then sums many numbers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Writing many numbers\n",
    "@func_to_container_op\n",
    "def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):\n",
    "    with open(numbers_path, 'w') as writer:\n",
    "        for i in range(start, count):\n",
    "            writer.write(str(i) + '\\n')\n",
    "\n",
    "\n",
    "# Reading and summing many numbers\n",
    "@func_to_container_op\n",
    "def sum_numbers(numbers_path: InputPath(str)) -> int:\n",
    "    sum = 0\n",
    "    with open(numbers_path, 'r') as reader:\n",
    "        for line in reader:\n",
    "            sum = sum + int(line)\n",
    "    return sum\n",
    "\n",
    "\n",
    "\n",
    "# Pipeline to sum 100000 numbers\n",
    "def sum_pipeline(count: 'Integer' = 100000):\n",
    "    numbers_task = write_numbers(count=count)\n",
    "    print_text(numbers_task.output)\n",
    "\n",
    "    sum_task = sum_numbers(numbers_task.outputs['numbers'])\n",
    "    print_text(sum_task.output)\n",
    "\n",
    "\n",
    "# Running the pipeline\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(sum_pipeline, arguments={})"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
